Oozie Coordinator Specification

这文档的目的是定义coordinator引擎系统指定提交基于时间和数据触发的WF.

1. Coordinator Overview

上一章节我们学习了怎么定义多个WF作业. 通常WF可以基于常规周期时间/有效数据来运行,有时由外部事件触发。WF作业满足应用数据/时间/外部事件的条件时启动。连接多个WF结果作为一个数据应用管线。Oozie Coordinator系统定义和执行重复和独立WF作业。真实数据应用管线必须解释处理,延迟处理,catchup,分布处理,监控,通知和SLA。

定义:Actual time,Nominal(名义) time,Synchronous DataSet

参数化表达式语言:可以定义参数化变量,内置常量和内置函数,执行时所有参数分解到具体值,使用语言为JSP EL。EL可以用于XML特性值和XML元素值,不能用于XML元素和XML特性名称

2. 时间,周期展现

Oozie处理coordinator使用固定时区,用于分解作业start/end时间,pause时间和数据集初始实例化。如果使用的是UTC时区,使用修饰符 Z,如果是其它UTC,必须加上GMT 偏移量, (+/-)#### .例如UTC 时间是2012-08-12T00:00Z , 相同时间是GMT+5:30 is 2012-08-12T05:30+0530

2.1 时间

如果处理时区为UTC,时间精度为分钟, 'YYYY-MM-DDTHH:mmZ',例如 2009-08-10T13:10Z是August 10th 2009 at 13:10 UTC.如果处理是 GMT 偏移量GMT(+/-)#### ,精度仍未分钟,格式'YYYY-MM-DDTHH:mmGMT(+/-)####'.例如, 2009-08-10T13:10+0530 是 August 10th 2009 at 13:10 GMT+0530, 印度时区.

表示一天结束使用24:00(i.e.2009-08-10T24:00Z ).但计算和显示时,都会把这种日期作为下一天的0小时, (i.e. 2009-08-11T00:00Z )

2.2 频繁和时间周期

EL Constant                 Value       Example
${coord:minutes(int n)}     n           ${coord:minutes(45)} --> 45
${coord:hours(int n)}       n * 60      ${coord:hours(3)} --> 180
${coord:days(int n)}        variable    ${coord:days(2)} --> minutes in 2 full days from the current date
${coord:months(int n)}      variable    ${coord:months(1)} --> minutes in a 1 full month from the current date
${cron syntax}              variable    ${0,10 15 * * 2-6} --> a job that runs every weekday at 3:00pm and 3:10pm UTC time展现 ###

The coord:days(int n) and coord:endOfDays(int n) EL functions

<coordinator-app name="hello-coord" frequency="${coord:days(1)}"
                  start="2009-01-02T08:00Z" end="2009-01-04T08:00Z" timezone="America/Los_Angeles"
                 xmlns="uri:oozie:coordinator:0.1">
      <controls>
        <timeout>10</timeout>
        <concurrency>${concurrency_level}</concurrency>
        <execution>${execution_order}</execution>
        <throttle>${materialization_throttle}</throttle>
      </controls>      <datasets>
       <dataset name="din" frequency="${coord:endOfDays(1)}"
                initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
         <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
        </dataset>
       <dataset name="dout" frequency="${coord:minutes(30)}"
                initial-instance="2009-01-02T08:00Z" timezone="UTC">
         <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
        </dataset>
      </datasets>
      <input-events>
         <data-in name="input" dataset="din">
                <instance>${coord:current(0)}</instance>
         </data-in>
      </input-events>
      <output-events>
         <data-out name="output" dataset="dout">
                <instance>${coord:current(1)}</instance>
         </data-out>
      </output-events>
      <action>
        <workflow>
          <app-path>${wf_app_path}</app-path>
          <configuration>
              <property>
              <name>wfInput</name>
              <value>${coord:dataIn('input')}</value>
            </property>
            <property>
              <name>wfOutput</name>
              <value>${coord:dataOut('output')}</value>
            </property>
         </configuration>
       </workflow>
      </action>
 </coordinator-app>

The coord:months(int n) and coord:endOfMonths(int n) EL functions

<coordinator-app name="hello-coord" frequency="${coord:months(1)}"
                  start="2009-01-02T08:00Z" end="2009-04-02T08:00Z" timezone="America/Los_Angeles"
                 xmlns="uri:oozie:coordinator:0.1">
      <controls>
        <timeout>10</timeout>
        <concurrency>${concurrency_level}</concurrency>
        <execution>${execution_order}</execution>
        <throttle>${materialization_throttle}</throttle>
      </controls>      <datasets>
       <dataset name="din" frequency="${coord:endOfMonths(1)}"
                initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
         <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
        </dataset>
       <dataset name="dout" frequency="${coord:minutes(30)}"
                initial-instance="2009-01-02T08:00Z" timezone="UTC">
         <uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
        </dataset>
      </datasets>
      <input-events>
         <data-in name="input" dataset="din">
                <instance>${coord:current(0)}</instance>
         </data-in>
      </input-events>
      <output-events>
         <data-out name="output" dataset="dout">
                <instance>${coord:current(1)}</instance>
         </data-out>
      </output-events>
      <action>
        <workflow>
          <app-path>${wf_app_path}</app-path>
          <configuration>
              <property>
              <name>wfInput</name>
              <value>${coord:dataIn('input')}</value>
            </property>
            <property>
              <name>wfOutput</name>
              <value>${coord:dataOut('output')}</value>
            </property>
         </configuration>
       </workflow>
      </action>
 </coordinator-app>

Cron 语法

运行作业分解为minutes, hours, days or weeks,这样需要持续运行整年,编译一个查询索引来提交到实时网站中,但很多例子中不适用,比如 导出数据到一个报表系统用于业务分析。通宵运行作业而没有分析会是一种浪费。 Cron 是一个标准基于时间作业调度机制在类unix的操作系统,用于拓展系统管理员来启动作业和维护软件环境。Cron 语法通常有五个字段组成:minutes, hours, date of month, month, and day of week,尽管多个变量不存在。

<coordinator-app name="cron-coord" frequency="0/10 1/2 * * *" start="${start}" end="${end}" timezone="UTC"
                 xmlns="uri:oozie:coordinator:0.2">
        <action>
        <workflow>
            <app-path>${workflowAppUri}</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

Cron表达式包含5个必须字段:

字段名称        允许值                 允许指定字符
Minutes         0-59                , - * /
Hours           0-23                , - * /
Day-of-month    1-31                , - * ? / L W
Month           1-12 or JAN-DEC         , - * /
Day-of-Week         1-7 or SUN-SAT      , - * ? / L #

说明:
    '*' 所有值
    '?' 仅用于day-of-month和day-of-week字段
    '-' 指定范围 "10-12" -> "the hours 10, 11 and 12"
    ',' 额外值, "MON,WED,FRI" -> "the days Monday, Wednesday, and Friday"
    '/' 指定增量,
            "0/15" ->"the minutes 0, 15, 30, and 45"
            "5/15" ->"the minutes 5, 20, 35, and 50"
            '/'前指定'*'等效 从0开始
            只是用来开启关闭作业执行
    'L' 用于 day-of-month和 day-of-week字段,-> 'last'  
            but it has different meaning in each of the two fields. For example, the value "L" in the day-of-month field means "the last day of the month" - day 31 for January, day 28 for February on non-leap years. If used in the day-of-week field by itself, it simply means "7" or "SAT". But if used in the day-of-week field after another value, it means "the last xxx day of the month" - for example "6L" means "the last friday of the month". You can also specify an offset from the last day of the month, such as "L-3" which would mean the third-to-last day of the calendar month. When using the 'L' option, it is important not to specify lists, or ranges of values, as you'll get confusing/unexpected results.
    'W' 在day-of-month字段有效,用于指定最近的工作日,
            "15W"-> 当月15th的最近的工作日,
            is: "the nearest weekday to the 15th of the month". So if the 15th is a Saturday, the trigger will fire on Friday the 14th. If the 15th is a Sunday, the trigger will fire on Monday the 16th. If the 15th is a Tuesday, then it will fire on Tuesday the 15th. However if you specify "1W" as the value for day-of-month, and the 1st is a Saturday, the trigger will fire on Monday the 3rd, as it will not 'jump' over the boundary of a month's days. The 'W' character can only be specified when the day-of-month is a single day, not a range or list of days.
     'LW'  ->"last weekday of the month"
     '#' 用于 day-of-week字段,指定某月的"the nth" XXX 
            "6#3" -> third Friday of the month(day 6 = Friday and "#3" = the 3rd one in the month)
            "2#1" ->  the first Monday of the month
            "4#5" ->  the fifth Wednesday of the month
            "#5" -> 
            有且仅能用一次 '#',"3#1,6#3"将无效

    无效cron表达式将不会运行,并抛异常,如"0 10 30 2 *" 
    coordinator作业没有action,如 frequency of "0 10 * * *" with start time of 2013-10-18T21:00Z and end time of 2013-10-18T22:00Z ,提交将被拒绝,并抛异常。

案例:

    Cron Expression     Meaning
    10 9 * * *          Runs everyday at 9:10am
    10,30,45 9 * * *    Runs everyday at 9:10am, 9:30am, and 9:45am
    0 * 30 JAN 2-6      Runs at 0 minute of every hour on weekdays and 30th of January
    0/20 9-17 * * 2-5   Runs every Mon, Tue, Wed, and Thurs at minutes 0, 20, 40 from 9am to 5pm
    1 2 L-3 * *             Runs every third-to-last day of month at 2:01am
    1 2 6W 3 ?          Runs on the nearest weekday to March, 6th every year at 2:01am
    1 2 * 3 3#2         Runs every second Tuesday of March at 2:01am every year
    0 10,13 * * MON-FRI     Runs every weekday at 10am and 1pm

注意:

Cron expression and syntax in Oozie are inspired by Quartz:http://quartz-scheduler.org/api/2.0.0/org/quartz/CronExpression.html. However, there is a major difference between Quartz cron and Oozie cron in which Oozie cron doesn't have "Seconds" field since everything in Oozie functions at the minute granularity at most. Everything related to Oozie cron syntax should be based on the documentation in the Oozie documentation.

Cron expression uses oozie server processing timezone. Since default oozie processing timezone is UTC, if you want to run a job on every weekday at 10am in Tokyo, Japan(UTC + 9), your cron expression should be "0 1 * 2-6" instead of the "0 10 * 2-6" which you might expect.

Overflowing ranges is supported but strongly discouraged - that is, having a larger number on the left hand side than the right. You might do 22-2 to catch 10 o'clock at night until 2 o'clock in the morning, or you might have NOV-FEB. It is very important to note that overuse of overflowing ranges creates ranges that don't make sense and no effort has been made to determine which interpretation CronExpression chooses. An example would be "0 14-6 ? * FRI-MON".

3. 数据集 ## (未完成,待整理)

数据收集通过逻辑名称,展现为唯一URI集合,可独立引用,可由多个独立标识的URI组成,或单个URI。典型定义在业务域中心化位置,并可以通过coordinator访问,一次定义多次使用。数据集市同步输入(常规时间周期生成),被作业消费,可以认为是不可变的。

同步数据集

由周期时间生成,也被称之为 时钟数据集

定义信息包含:

name: The dataset name. It must be a valid Java identifier. frequency: 周期创建时间.El: ${5 * HOUR}. initial-instance: The UTC datetime of the initial instance of the dataset. The initial-instance also provides the baseline datetime to compute instances of the dataset using multiples of the frequency. timezone: The timezone of the dataset. uri-template: The URI template that identifies the dataset and can be resolved into concrete URIs to identify a particular dataset instance. The URI template is constructed using: constants See the allowable EL Time Constants below. Ex: ${YEAR}/${MONTH}. variables Variables must be resolved at the time a coordinator job is submitted to the coordinator engine. They are normally provided a job parameters (configuration properties). Ex: ${market}/${language} done-flag: This flag denotes when a dataset instance is ready to be consumed. If the done-flag is omitted the coordinator will wait for the presence of a _SUCCESS file in the directory (Note: MapReduce jobs create this on successful completion automatically). If the done-flag is present but empty, then the existence of the directory itself indicates that the dataset is ready. If the done-flag is present but non-empty, Oozie will check for the presence of the named file within the directory, and will be considered ready (done) when the file exists.

EL常量用用同步数据集URI模板:

EL 常数   结果格式        备注
YEAR    YYYY        4 digits representing the year
MONTH   DD      2 digits representing the month of the year, January = 1
DAY     DD      2 digits representing the day of the month
HOUR    HH      2 digits representing the hour of the day, in 24 hour format, 0 - 23
MINUTE  mm      2 digits representing the minute of the hour, 0 - 59

同步语法

<dataset name="[NAME]" frequency="[FREQUENCY]"
           initial-instance="[DATETIME]" timezone="[TIMEZONE]">
    <uri-template>[URI TEMPLATE]</uri-template>
    <done-flag>[FILE NAME]</done-flag>
</dataset>

实例:

1. A dataset produced once every day at 00:15 PST8PDT and done-flag is set to empty:

<dataset name="logs" frequency="${coord:days(1)}"
       initial-instance="2009-02-15T08:15Z" timezone="America/Los_Angeles">
    <uri-template>
      hdfs://foo:8020/app/logs/${market}/${YEAR}${MONTH}/${DAY}/data
    </uri-template>
    <done-flag></done-flag>
</dataset>

数据集将分解为下面URI并且coordinator检查目录是否存在:

[market] will be replaced with user given property.  hdfs://foo:8020/usr/app/[market]/2009/02/15/data
  hdfs://foo:8020/usr/app/[market]/2009/02/16/data
  hdfs://foo:8020/usr/app/[market]/2009/02/17/data

2. A dataset available on the 10th of each month and done-flag is default '_SUCCESS':

<dataset name="stats" frequency="${coord:months(1)}"
       initial-instance="2009-01-10T10:00Z" timezone="America/Los_Angeles">
    <uri-template>hdfs://foo:8020/usr/app/stats/${YEAR}/${MONTH}/data</uri-template>
</dataset>

数据分解为URI:

hdfs://foo:8020/usr/app/stats/2009/01/data
hdfs://foo:8020/usr/app/stats/2009/02/data
hdfs://foo:8020/usr/app/stats/2009/03/data

The dataset instances are not ready until '_SUCCESS' exists in each path:

hdfs://foo:8020/usr/app/stats/2009/01/data/_SUCCESS
hdfs://foo:8020/usr/app/stats/2009/02/data/_SUCCESS
hdfs://foo:8020/usr/app/stats/2009/03/data/_SUCCESS

3. A dataset available at the end of every quarter and done-flag is 'trigger.dat':

hdfs://foo:8020/usr/app/stats/${YEAR}/${MONTH}/data trigger.dat

The dataset would resolve to the following URIs:

hdfs://foo:8020/usr/app/stats/2009/01/data hdfs://foo:8020/usr/app/stats/2009/04/data hdfs://foo:8020/usr/app/stats/2009/07/data ...

The dataset instances are not ready until 'trigger.dat' exists in each path:

hdfs://foo:8020/usr/app/stats/2009/01/data/trigger.dat hdfs://foo:8020/usr/app/stats/2009/04/data/trigger.dat hdfs://foo:8020/usr/app/stats/2009/07/data/trigger.dat ...

4. Normally the URI template of a dataset has a precision similar to the frequency:

hdfs://foo:8020/usr/app/logs/${YEAR}/${MONTH}/${DAY}/data

The dataset would resolve to the following URIs:

hdfs://foo:8020/usr/app/logs/2009/01/01/data hdfs://foo:8020/usr/app/logs/2009/01/02/data hdfs://foo:8020/usr/app/logs/2009/01/03/data ... 5. However, if the URI template has a finer precision than the dataset frequency:

hdfs://foo:8020/usr/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data

The dataset resolves to the following URIs with fixed values for the finer precision template variables:

hdfs://foo:8020/usr/app/logs/2009/01/01/10/30/data hdfs://foo:8020/usr/app/logs/2009/01/02/10/30/data hdfs://foo:8020/usr/app/logs/2009/01/03/10/30/data ...

5.2. Dataset URI-Template types

Each dataset URI could be a HDFS path URI denoting a HDFS directory: hdfs://foo:8020/usr/logs/20090415 or a HCatalog partition URI identifying a set of table partitions: hcat://bar:8020/logsDB/logsTable/dt=20090415;region=US.

HCatalog enables table and storage management for Pig, Hive and MapReduce. The format to specify a HCatalog table partition URI is hcat://[metastore server]:[port]/[database name]/[table name]/[partkey1]=[value];[partkey2]=[value];...

<dataset name="logs" frequency="${coord:days(1)}"
           initial-instance="2009-02-15T08:15Z" timezone="America/Los_Angeles">
    <uri-template>
      hcat://myhcatmetastore:9080/database1/table1/myfirstpartitionkey=myfirstvalue;mysecondpartitionkey=mysecondvalue
    </uri-template>
    <done-flag></done-flag>
</dataset>

5.4. Dataset Definitions

Dataset definitions are grouped in XML files. IMPORTANT: Please note that if an XML namespace version is specified for the coordinator-app element in the coordinator.xml file, no namespace needs to be defined separately for the datasets element (even if the dataset is defined in a separate file). Specifying it at multiple places might result in xml errors while submitting the coordinator job.

Syntax:

 <!-- Synchronous datasets -->
<datasets>
  <include>[SHARED_DATASETS]</include>
  ...
  <dataset name="[NAME]" frequency="[FREQUENCY]"
           initial-instance="[DATETIME]" timezone="[TIMEZONE]">
    <uri-template>[URI TEMPLATE]</uri-template>
  </dataset>
  ...
</datasets>

Example:

<datasets>
.
  <include>hdfs://foo:8020/app/dataset-definitions/globallogs.xml</include>
.
  <dataset name="logs" frequency="${coord:hours(12)}"
           initial-instance="2009-02-15T08:15Z" timezone="Americas/Los_Angeles">
    <uri-template>
    hdfs://foo:8020/app/logs/${market}/${YEAR}${MONTH}/${DAY}/${HOUR}/${MINUTE}/data
    </uri-template>
  </dataset>
.
    <dataset name="stats" frequency="${coord:months(1)}"
               initial-instance="2009-01-10T10:00Z" timezone="Americas/Los_Angeles">
        <uri-template>hdfs://foo:8020/usr/app/stats/${YEAR}/${MONTH}/data</uri-template>
    </dataset>
.
</datasets>

Coordinator Application

Coordinator 作业状态:PREP, RUNNING, RUNNINGWITHERROR, PREPSUSPENDED, SUSPENDED, SUSPENDEDWITHERROR, PREPPAUSED, PAUSED, PAUSEDWITHERROR, SUCCEEDED, DONEWITHERROR, KILLED, FAILED

状态过渡:

PREP --> PREPSUSPENDED | PREPPAUSED | RUNNING | KILLED
RUNNING --> RUNNINGWITHERROR | SUSPENDED | PAUSED | SUCCEEDED | KILLED
RUNNINGWITHERROR --> RUNNING | SUSPENDEDWITHERROR | PAUSEDWITHERROR | DONEWITHERROR | KILLED | FAILED
PREPSUSPENDED --> PREP | KILLED
SUSPENDED --> RUNNING | KILLED
SUSPENDEDWITHERROR --> RUNNINGWITHERROR | KILLED
PREPPAUSED --> PREP | KILLED
PAUSED --> SUSPENDED | RUNNING | KILLED
PAUSEDWITHERROR --> SUSPENDEDWITHERROR | RUNNINGWITHERROR | KILLED
FAILED | KILLED --> IGNORED
IGNORED --> RUNNING

Coordinator Action

Coordinator 作业有个驱动事件决定创建Coordinator活动;

Action状态过渡:

WAITING --> READY | TIMEDOUT | SKIPPED | KILLED
READY --> SUBMITTED | SKIPPED | KILLED
SUBMITTED --> RUNNING | KILLED | FAILED
RUNNING --> SUCCEEDED | KILLED | FAILED
FAILED | KILLED | TIMEDOUT --> IGNORED
IGNORED --> WAITING

输入事件: 输出事件:

#### 同步Coordinator应用定义 #### start end timezone frequency Control : timeout concurrency execution: FIFO (oldest first) default . LIFO (newest first). LAST_ONLY (see explanation below). NONE (discards all older materialization, see explanation below). throttle datasets input-events data-in name dataset instance start-instance end-instance output-events: data-out: name dataset instance action workflow 语法

<coordinator-app name="[NAME]" frequency="[FREQUENCY]"
                    start="[DATETIME]" end="[DATETIME]" timezone="[TIMEZONE]"
                    xmlns="uri:oozie:coordinator:0.1">
      <controls>
        <timeout>[TIME_PERIOD]</timeout>
        <concurrency>[CONCURRENCY]</concurrency>
        <execution>[EXECUTION_STRATEGY]</execution>
      </controls>
      <datasets>
        <include>[SHARED_DATASETS]</include>
        ...
        <!-- Synchronous datasets -->
        <dataset name="[NAME]" frequency="[FREQUENCY]"
                 initial-instance="[DATETIME]" timezone="[TIMEZONE]">
          <uri-template>[URI_TEMPLATE]</uri-template>
        </dataset>
        ...
      </datasets>
      <input-events>
        <data-in name="[NAME]" dataset="[DATASET]">
          <instance>[INSTANCE]</instance>
          ...
        </data-in>
        ...
        <data-in name="[NAME]" dataset="[DATASET]">
          <start-instance>[INSTANCE]</start-instance>
          <end-instance>[INSTANCE]</end-instance>
        </data-in>
        ...
      </input-events>
      <output-events>
         <data-out name="[NAME]" dataset="[DATASET]">
           <instance>[INSTANCE]</instance>
         </data-out>
         ...
      </output-events>
      <action>
        <workflow>
          <app-path>[WF-APPLICATION-PATH]</app-path>
          <configuration>
            <property>
              <name>[PROPERTY-NAME]</name>
              <value>[PROPERTY-VALUE]</value>
            </property>
            ...
         </configuration>
       </workflow>
      </action>
   </coordinator-app>

一天执行一次:

   <coordinator-app name="hello-coord" frequency="${coord:days(1)}"
                    start="2009-01-02T08:00Z" end="2009-01-02T08:00Z"
                    timezone="America/Los_Angeles"
                    xmlns="uri:oozie:coordinator:0.1">
      <datasets>
        <dataset name="logs" frequency="${coord:days(1)}"
                 initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
          <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/data</uri-template>
        </dataset>
        <dataset name="siteAccessStats" frequency="${coord:days(1)}"
                 initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
          <uri-template>hdfs://bar:8020/app/stats/${YEAR}/${MONTH}/${DAY}/data</uri-template>
        </dataset>
      </datasets>
      <input-events>
        <data-in name="input" dataset="logs">
          <instance>2009-01-02T08:00Z</instance>
        </data-in>
      </input-events>
      <output-events>
         <data-out name="output" dataset="siteAccessStats">
           <instance>2009-01-02T08:00Z</instance>
         </data-out>
      </output-events>
      <action>
        <workflow>
          <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
          <configuration>
            <property>
              <name>wfInput</name>
              <value>${coord:dataIn('input')}</value>
            </property>
            <property>
              <name>wfOutput</name>
              <value>${coord:dataOut('output')}</value>
            </property>
         </configuration>
       </workflow>
      </action>
   </coordinator-app>

多个时间执行:

   <coordinator-app name="hello-coord" frequency="${coord:days(1)}"
                    start="2009-01-02T08:00Z" end="2010-01-02T08:00Z"
                    timezone="America/Los_Angeles"
                    xmlns="uri:oozie:coordinator:0.1">
      <datasets>
        <dataset name="logs" frequency="${coord:days(1)}"
                 initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
          <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/data</uri-template>
        </dataset>
        <dataset name="siteAccessStats" frequency="${coord:days(1)}"
                 initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
          <uri-template>hdfs://bar:8020/app/stats/${YEAR}/${MONTH}/${DAY}/data</uri-template>
        </dataset>
      </datasets>
      <input-events>
        <data-in name="input" dataset="logs">
          <instance>${coord:current(0)}</instance>
        </data-in>
      </input-events>
      <output-events>
         <data-out name="output" dataset="siteAccessStats">
           <instance>${coord:current(0)}</instance>
         </data-out>
      </output-events>
      <action>
        <workflow>
          <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
          <configuration>
            <property>
              <name>wfInput</name>
              <value>${coord:dataIn('input')}</value>
            </property>
            <property>
              <name>wfOutput</name>
              <value>${coord:dataOut('output')}</value>
            </property>
         </configuration>
       </workflow>
      </action>
   </coordinator-app>

Coordinator Job that executes its coordinator action multiple times and as input takes multiple dataset instances:

'logs'是同步数据集每天的24:00时执行.

'weeklystats'为同步数据集,每7天的 24:00执行一次.

   <coordinator-app name="hello2-coord" frequency="${coord:days(7)}"
                    start="2009-01-07T24:00Z" end="2009-12-12T24:00Z"
                    timezone="UTC"
                    xmlns="uri:oozie:coordinator:0.1">
      <datasets>
        <dataset name="logs" frequency="${coord:days(1)}"
                 initial-instance="2009-01-01T24:00Z" timezone="UTC">
          <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}</uri-template>
        </dataset>
        <dataset name="weeklySiteAccessStats" frequency="${coord:days(7)}"
                 initial-instance="2009-01-07T24:00Z" timezone="UTC">
          <uri-template>hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY}</uri-template>
        </dataset>
      </datasets>
      <input-events>
        <data-in name="input" dataset="logs">
          <start-instance>${coord:current(-6)}</start-instance>
          <end-instance>${coord:current(0)}</end-instance>
        </data-in>
      </input-events>
      <output-events>
         <data-out name="output" dataset="siteAccessStats">
           <instance>${coord:current(0)}</instance>
         </data-out>
      </output-events>
      <action>
        <workflow>
          <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
          <configuration>
            <property>
              <name>wfInput</name>
              <value>${coord:dataIn('input')}</value>
            </property>
            <property>
              <name>wfOutput</name>
              <value>${coord:dataOut('output')}</value>
            </property>
         </configuration>
       </workflow>
      </action>
   </coordinator-app>

第一个coordinator action 分解为:

  <workflow>
    <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
    <configuration>
      <property>
        <name>wfInput</name>
        <value>
               hdfs://bar:8020/app/logs/200901/01,hdfs://bar:8020/app/logs/200901/02,
               hdfs://bar:8020/app/logs/200901/03,hdfs://bar:8020/app/logs/200901/05,
               hdfs://bar:8020/app/logs/200901/05,hdfs://bar:8020/app/logs/200901/06,
               hdfs://bar:8020/app/logs/200901/07
        </value>
      </property>
      <property>
        <name>wfOutput</name>
        <value>hdfs://bar:8020/app/stats/2009/01/07</value>
      </property>
    </configuration>
  </workflow>

第二个coordinator action 分解为:

  <workflow>
    <app-path>hdfs://bar:8020/usr/joe/logsprocessor-wf</app-path>
    <configuration>
      <property>
        <name>wfInput</name>
        <value>
               hdfs://bar:8020/app/logs/200901/08,hdfs://bar:8020/app/logs/200901/09,
               hdfs://bar:8020/app/logs/200901/10,hdfs://bar:8020/app/logs/200901/11,
               hdfs://bar:8020/app/logs/200901/12,hdfs://bar:8020/app/logs/200901/13,
               hdfs://bar:8020/app/logs/200901/16
        </value>
      </property>
      <property>
        <name>wfOutput</name>
        <value>hdfs://bar:8020/app/stats/2009/01/16</value>
      </property>
    </configuration>
  </workflow>

Coordinator Applications 参数化

   <coordinator-app name="app-coord" frequency="${coord:days(1)}"
                    start="${jobStart}" end="${jobEnd}" timezone="${timezone}"
                    xmlns="uri:oozie:coordinator:0.1">
      <datasets>
        <dataset name="logs" frequency="${coord:hours(1)}"
                 initial-instance="${logsInitialInstance}" timezone="${timezone}">
          <uri-template>
            hdfs://bar:8020/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY}/${HOUR}
          </uri-template>
        </dataset>
      </datasets>
      <input-events>
        <data-in name="input" dataset="logs">
          <start-instance>${coord:current(-23)}</start-instance>
          <end-instance>${coord:current(0)}</end-instance>
        </data-in>
      </input-events>
      <action>
        <workflow>
        ...
       </workflow>
      </action>
   </coordinator-app>

样例参数:

   <coordinator-app name="app-coord" frequency="${coord:days(1)}"
                    start="${jobStart}" end="${jobEnd}" timezone="${timezone}"
                    xmlns="uri:oozie:coordinator:0.1">
      <parameters>
          <property>
              <name>jobStart</name>
          </property>
          <property>
              <name>jobEnd</name>
              <value>2012-12-01T22:00Z</value>
          </property>
      </parameters>
      <datasets>
        <dataset name="logs" frequency="${coord:hours(1)}"
                 initial-instance="${logsInitialInstance}" timezone="${timezone}">
          <uri-template>
            hdfs://bar:8020/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY}/${HOUR}
          </uri-template>
        </dataset>
      </datasets>
      <input-events>
        <data-in name="input" dataset="logs">
          <start-instance>${coord:current(-23)}</start-instance>
          <end-instance>${coord:current(0)}</end-instance>
        </data-in>
      </input-events>
      <action>
        <workflow>
        ...
       </workflow>
      </action>
   </coordinator-app>

同步数据集定义: . hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY} . hdfs://bar:8020/app/weeklystats/${YEAR}/${MONTH}/${DAY} .

数据集定义在'datasets.xml'中:

<datasets>  
    <dataset name="logs" frequency="${coord:hours(1)}"
           initial-instance="2009-01-01T01:00Z" timezone="UTC">
        <uri-template>hdfs://bar:8020/app/logs/${YEAR}${MONTH}/${DAY}/${HOUR}</uri-template>
    </dataset>
</datasets>

添加数据集:

<coordinator-app name="app-coord" frequency="${coord:days(1)}"
                start="2009-01-01T24:00Z" end="2009-12-31T24:00Z" timezone="UTC"
                xmlns="uri:oozie:coordinator:0.1">
    ...
    <datasets>
        <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include>
    </datasets>
    ...
</coordinator-app>


  <dataset name="15MinLogs" frequency="${coord:minutes(15)}"
           initial-instance="2009-01-01T00:15:00Z" timezone="UTC">
        <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
  </dataset>
  <dataset name="1HourLogs" frequency="${coord:hours(1)}"
           initial-instance="2009-01-01T01:00:00Z" timezone="UTC">
        <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
  </dataset>
  <dataset name="1DayLogs" frequency="${coord:hours(24)}"
           initial-instance="2009-01-01T24:00:00Z" timezone="UTC">
        <uri-template>hdfs://bar:8020/app/logs/${YEAR}/${MONTH}/${DAY}</uri-template>
  </dataset>



   <coordinator-app name="app-coord-hourly" frequency="${coord:hours(1)}"
                    start="2009-01-01T01:00Z" end="2009-12-31T24:00Z" timezone="UTC"
                    xmlns="uri:oozie:coordinator:0.1">
      <datasets>
        <include>hdfs://foo:8020/app/dataset-definitions/datasets.xml</include>
      </datasets>
      <input-events>
        <data-in name="input" dataset="15MinLogs">
          <start-instance>${coord:current(-3)}</start-instance>
          <end-instance>${coord:current(0)}</end-instance>
        </data-in>
      </input-events>
      <output-events>
        <data-out name="output" dataset="1HourLogs">
          <instance>${coord:current(0)}</instance>
        </data-out>
      </output-events>
      <action>
        <workflow>
        ...
       </workflow>
      </action>
   </coordinator-app>

附录 Oozie Coordinator XML-Schema v0.4

<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:coordinator="uri:oozie:coordinator:0.2"
           elementFormDefault="qualified" targetNamespace="uri:oozie:coordinator:0.2">    
    <xs:element name="coordinator-app" type="coordinator:COORDINATOR-APP"/>
    <xs:element name="datasets" type="coordinator:DATASETS"/>
    <xs:simpleType name="IDENTIFIER">
        <xs:restriction base="xs:string">
            <xs:pattern value="([a-zA-Z]([\-_a-zA-Z0-9])*){1,39})"/>
        </xs:restriction>
    </xs:simpleType>
    <xs:complexType name="COORDINATOR-APP">
        <xs:sequence>
            <xs:element name="parameters" type="coordinator:PARAMETERS" minOccurs="0" maxOccurs="1"/>
            <xs:element name="controls" type="coordinator:CONTROLS" minOccurs="0" maxOccurs="1"/>
            <xs:element name="datasets" type="coordinator:DATASETS" minOccurs="0" maxOccurs="1"/>
            <xs:element name="input-events" type="coordinator:INPUTEVENTS" minOccurs="0" maxOccurs="1"/>
            <xs:element name="output-events" type="coordinator:OUTPUTEVENTS" minOccurs="0" maxOccurs="1"/>
            <xs:element name="action" type="coordinator:ACTION" minOccurs="1" maxOccurs="1"/>
        </xs:sequence>
        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
        <xs:attribute name="frequency" type="xs:string" use="required"/>
        <xs:attribute name="start" type="xs:string" use="required"/>
        <xs:attribute name="end" type="xs:string" use="required"/>
        <xs:attribute name="timezone" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="PARAMETERS">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="0" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="CONTROLS">
        <xs:sequence minOccurs="0" maxOccurs="1">
            <xs:element name="timeout" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="concurrency" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="execution" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="throttle" type="xs:string" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DATASETS">
        <xs:sequence minOccurs="0" maxOccurs="1">
            <xs:element name="include" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:choice minOccurs="0" maxOccurs="unbounded">
                <xs:element name="dataset" type="coordinator:SYNCDATASET" minOccurs="0" maxOccurs="1"/>
            </xs:choice>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="SYNCDATASET">
        <xs:sequence>
            <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="done-flag" type="xs:string" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
        <xs:attribute name="frequency" type="xs:string" use="required"/>
        <xs:attribute name="initial-instance" type="xs:string" use="required"/>
        <xs:attribute name="timezone" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="INPUTEVENTS">
        <xs:sequence minOccurs="1" maxOccurs="1">
            <xs:element name="data-in" type="coordinator:DATAIN" minOccurs="1" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DATAIN">
        <xs:choice minOccurs="1" maxOccurs="1">
            <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="unbounded"/>
            <xs:sequence minOccurs="1" maxOccurs="1">
                <xs:element name="start-instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
                <xs:element name="end-instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
            </xs:sequence>
        </xs:choice>
        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
        <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:complexType name="OUTPUTEVENTS">
        <xs:sequence minOccurs="1" maxOccurs="1">
            <xs:element name="data-out" type="coordinator:DATAOUT" minOccurs="1" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DATAOUT">
        <xs:sequence minOccurs="1" maxOccurs="1">
            <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
        </xs:sequence>
        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
        <xs:attribute name="dataset" type="coordinator:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:complexType name="ACTION">
        <xs:sequence minOccurs="1" maxOccurs="1">
            <xs:element name="workflow" type="coordinator:WORKFLOW" minOccurs="1" maxOccurs="1"/>
            <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="WORKFLOW">
        <xs:sequence>
            <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="configuration" type="coordinator:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="FLAG"/>
    <xs:complexType name="CONFIGURATION">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
</xs:schema>